#!/usr/bin/env python
#-*-coding=utf-8-*-

import sys
from twisted.internet import task ,reactor
import json
import config.glob_conf
import requests
from requests.auth import HTTPBasicAuth
import datetime
from scpy.logger import get_logger
from pymongo import MongoClient
from kafka import SimpleConsumer,KafkaClient,SimpleProducer,KafkaConsumer

from stompest.config import StompConfig
from stompest.sync import Stomp

reload(sys)
sys.setdefaultencoding(u'utf-8')

logger = get_logger(__file__)

#mongo
conn = MongoClient(config.glob_conf.MONGO_HOST,27017)
db = conn[config.glob_conf.NAME_DB]
collection =db[config.glob_conf.NAME_COLL]


db_log = conn['crawler_log']
collection_log = db_log['saic_ids_log']



QUEUE_DICT = {
    'cq':'/queue/saic_cq',
    'tj':'/queue/saic_tj',
    'sc':'/queue/saic_sc',
    'gd':'/queue/saic_gd',
    'hn':'/queue/saic_hn',
    'js':'/queue/saic_js',
    'nmg':'/queue/saic_nmg',
    'yn':'/queue/saic_yn',
    'hlj':'/queue/saic_hlj',
    'hen':'/queue/saic_hen',
    'ah':'/queue/saic_ah',
    'gx':'/queue/saic_gx',
    'bj':'/queue/saic_bj',
    'ln':'/queue/saic_ln',
    'xj':'/queue/saic_xj',
    'jx':'/queue/saic_jx',
    'sd':'/queue/saic_sd',
    'fj':'/queue/saic_fj',
    'gs':'/queue/saic_gs',
    'jl':'/queue/saic_jl',
    'hb':'/queue/saic_hb',
    'sx':'/queue/saic_sx',
    'qh':'/queue/saic_qh',
    'xz':'/queue/saic_xz',
    'sax':'/queue/saic_sax',
    'heb':'/queue/saic_heb',
    }


#kafka

kafka = KafkaClient(config.glob_conf.KAFAKA_HOST)
#queue

CONFIG = StompConfig(config.glob_conf.MQ_HOST)
client = Stomp(CONFIG)
client.connect()


def kafka_producer(province):
    """
    组织机构代码放到queue 里面
    """
    index = 0
    producer = SimpleProducer(kafka,async=True,batch_send_every_n=100,batch_send_every_t=60)
    for item in collection.find({'province':province}).batch_size(30):
        company_name = item.get('companyName')
        company_name = company_name.replace("(","（").replace(")","）")
        province = item.get('province')
        message = {'key':company_name,'porvince':province}
        producer.send_messages(QUEUE_DICT.get(province).split('/')[-1],json.dumps(message))
        index+=1
        if index%10000==0:
            logger.info('add message in kafka: %d'%index)


def kafka_consumer(count,province):
    """
    从kafka 拿名字放到queue
    """
    logger.info('start kafka consumer')
    temp_count = 0
    print count
    consumer = KafkaConsumer(QUEUE_DICT.get(province).split('/')[-1],
                             bootstrap_servers=[config.glob_conf.KAFAKA_HOST],
                             group_id='saic_group_1',
                             auto_commit_enable=True,
                             auto_commit_interval_ms=1 * 1000,
                             fetch_message_max_bytes=1024*10240,
                             fetch_min_bytes=1024*1024,
                             fetch_wait_max_ms=100,
                             auto_offset_reset='smallest'
                             )

    for message in consumer:
        temp_count+=1
        if temp_count>count:
            return
        client.send(QUEUE_DICT.get(province),message.value,headers={'persistent':'true'})
        consumer.task_done(message)
        if temp_count%100==0:
            logger.info('add message to mq: %d'%temp_count)
        if temp_count==count:
            logger.info('finish :%d'%count)
            consumer.commit()
            return


def mq_producer_timer(province):
    """
    定时从kafka 拿名字放到queue
    """
    logger.info('start mq producer')

    queue_size = requests.get(config.glob_conf.QUEUE_INFO_URL%QUEUE_DICT.get(province).split('/')[-1],auth=HTTPBasicAuth('admin', 'admin'))

    queue_json = json.loads(queue_size.content)
    queue_size = queue_json.get('value').get('QueueSize')
    print "queue_size",queue_size
    count = config.glob_conf.QUEUE_MAX_SIZE - queue_size
    kafka_consumer(count,province)


def name_producer(province,queue):
    CONFIG = StompConfig(config.glob_conf.MQ_HOST)
    client = Stomp(CONFIG)
    client.connect()

    for row in collection.find({'province':province}).batch_size(30):
        company_name = row.get('companyName')
        company_name = company_name.replace("(", u"（").replace(")", u"）")
        check_data = collection_log.find_one({'_id':company_name})
        if check_data and check_data['status']!='exception':
            logger.info('in db %s'%company_name)
            continue
        client.send(queue,json.dumps({'key':company_name}),headers={'persistent':'true'})
        logger.info(company_name)


if __name__ == '__main__':
    import sys
    province = sys.argv[1]
    t_t = sys.argv[2]
    if t_t == 'mq':
        print 'start loop'
        l = task.LoopingCall(mq_producer_timer,province)
        l.start(1*60*60)
        reactor.run()
    elif t_t=='kafka':
        kafka_producer(province)
    else:
        logger.info(u'参数错误,mq:往q里加东西 or kafka:往kafka加东西')


